OPC Studio User's Guide and Reference
Example Walkthrough
Client and Subscriber Development > Extensions > Layered Extensions for .NET and Python > StreamInsight Extensions > StreamInsight Extensions Fundamentals > Example Walkthrough
In This Topic

In this chapter, we will explain the usage of StreamInsight Extensions for QuickOPC on one of the examples that comes with the product. It is the SimpleDAStreamInsightApplication project in C#. This example uses OPC Data Access; other examples, for OPC Alarms and Events, or OPC Unified Architecture, are quite similar.

In order to make it easy to understand the specifics of StreamInsight Extensions for QuickOPC, and stay away from parts that common to all StreamInsight application, we have created this set of simple examples by deriving from the examples that Microsoft uses to teach StreamInsight. We can thus stay away from explaining the information that is already available by Microsoft.

Our example will expose an embedded server from inside a console application, and also run the query and display the results. It is based on Microsoft’s “StreamInsight Example: Server - Exposing an Embedded Server”, http://msdn.microsoft.com/en-us/library/hh995352(v=sql.111).aspx . It is recommended that you study the Microsoft’s example first.

Introductory Steps

The first steps are the same:

Define and Deploy a Source

In the next step, an input source is defined and deployed to the (StreamInsight) server with a name so that it can be used by other StreamInsight clients. In this example, the data is a simple temporal stream of point events generated by the OPC Data Access server.

The DAItemChangedObservable.Create<int> method creates an observable that generates a sequence of EasyDAItemChangedEventArgs<int> objects. Each notification contains, among other information, the value of a given integer OPC item (if available), as it is delivered by the OPC server. Because the EasyDAItemChangedEventArgs<int> objects are not directly suitable as StreamInsight event payloads, we convert them to DAItemChangedPayload<int>, which is pre-made payload object provided by the StreamInsight Extensions for QuickOPC.

Define and Deploy a Source
Copy Code
    // DEFINE a simple SOURCE (returns a point event every second)
    const string machineName = "";
    const string serverClass = "OPCLabs.KitServer.2";
    const string itemId = "Simulation.Incrementing (1 s)";
    var observable = DAItemChangedObservable.Create<int>(machineName, 
        serverClass, itemId, 100);
    var mySource = myApp
        .DefineObservable(() => observable)
        .ToPointStreamable(
            eventArgs => 
                PointEvent.CreateInsert(DateTimeOffset.Now, 
                (DAItemChangedPayload<int>)eventArgs),
            AdvanceTimeSettings.StrictlyIncreasingStartTime);

Compose a Query over the Source

Next, compose a query over the input source. The query uses LINQ as the query specification language. In this example, the query returns the events where the value of the OPC item is an even number. 

Compose a Query over the Source
Copy Code
    // Compose a QUERY over the source 
    // (return every event carrying even data value)
    var myQuery = from e in mySource
                  where e.VtqPayload.Value % 2 == 0
                  select e;

Technically, this definition translates to a filter operator that drops all events from the sequence that do not fulfill the filter predicate (where e.VtqPayload.Value % 2 == 0) and returns the event value. For more information about LINQ query operators, see Using StreamInsight LINQ.

Define and Deploy a Sink

Next, an output sink is created that can be bound to the query and process the resulting sequence. In this example, a simple function is created that simply writes the stream values to the console. The DAItemChangedPayload<int> class has a convenient ToString() method that returns the relevant payload information nicely formatted for output. 

Define and Deploy a Sink
Copy Code
    // DEFINE a simple observer SINK (writes the value to the server console)
    var mySink = myApp.DefineObserver(() => 
        Observer.Create<DAItemChangedPayload<int>>(
            payload => Console.WriteLine("sink_Server..: {0}", payload)));

The sink is then deployed to the server with a name, in the same way as in the original Microsoft example.

Bind and Run the Query and Sink

The remainder of the example is not doing anything specific to OPC. The observable query is bound to the observer output sink, and the query is then run in a process in the server. This process continues to run until the user stops it by typing in the console.

See Also